1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package rx.internal.operators;
17
18 import static org.junit.Assert.*;
19
20 import java.util.Arrays;
21 import java.util.concurrent.*;
22 import java.util.concurrent.atomic.*;
23
24 import org.junit.Test;
25
26 import rx.*;
27 import rx.Observable.OnSubscribe;
28 import rx.functions.*;
29 import rx.internal.util.RxRingBuffer;
30 import rx.observables.ConnectableObservable;
31 import rx.observers.TestSubscriber;
32 import rx.schedulers.Schedulers;
33 import rx.schedulers.TestScheduler;
34
35 public class OperatorPublishTest {
36
37 @Test
38 public void testPublish() throws InterruptedException {
39 final AtomicInteger counter = new AtomicInteger();
40 ConnectableObservable<String> o = Observable.create(new OnSubscribe<String>() {
41
42 @Override
43 public void call(final Subscriber<? super String> observer) {
44 new Thread(new Runnable() {
45
46 @Override
47 public void run() {
48 counter.incrementAndGet();
49 observer.onNext("one");
50 observer.onCompleted();
51 }
52 }).start();
53 }
54 }).publish();
55
56 final CountDownLatch latch = new CountDownLatch(2);
57
58
59 o.subscribe(new Action1<String>() {
60
61 @Override
62 public void call(String v) {
63 assertEquals("one", v);
64 latch.countDown();
65 }
66 });
67
68
69 o.subscribe(new Action1<String>() {
70
71 @Override
72 public void call(String v) {
73 assertEquals("one", v);
74 latch.countDown();
75 }
76 });
77
78 Subscription s = o.connect();
79 try {
80 if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
81 fail("subscriptions did not receive values");
82 }
83 assertEquals(1, counter.get());
84 } finally {
85 s.unsubscribe();
86 }
87 }
88
89 @Test
90 public void testBackpressureFastSlow() {
91 ConnectableObservable<Integer> is = Observable.range(1, RxRingBuffer.SIZE * 2).publish();
92 Observable<Integer> fast = is.observeOn(Schedulers.computation()).doOnCompleted(new Action0() {
93
94 @Override
95 public void call() {
96 System.out.println("^^^^^^^^^^^^^ completed FAST");
97 }
98
99 });
100 Observable<Integer> slow = is.observeOn(Schedulers.computation()).map(new Func1<Integer, Integer>() {
101 int c = 0;
102
103 @Override
104 public Integer call(Integer i) {
105 if (c == 0) {
106 try {
107 Thread.sleep(500);
108 } catch (InterruptedException e) {
109 }
110 }
111 c++;
112 return i;
113 }
114
115 }).doOnCompleted(new Action0() {
116
117 @Override
118 public void call() {
119 System.out.println("^^^^^^^^^^^^^ completed SLOW");
120 }
121
122 });
123
124 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
125 Observable.merge(fast, slow).subscribe(ts);
126 is.connect();
127 ts.awaitTerminalEvent();
128 ts.assertNoErrors();
129 assertEquals(RxRingBuffer.SIZE * 4, ts.getOnNextEvents().size());
130 }
131
132
133 @Test
134 public void testTakeUntilWithPublishedStreamUsingSelector() {
135 final AtomicInteger emitted = new AtomicInteger();
136 Observable<Integer> xs = Observable.range(0, RxRingBuffer.SIZE * 2).doOnNext(new Action1<Integer>() {
137
138 @Override
139 public void call(Integer t1) {
140 emitted.incrementAndGet();
141 }
142
143 });
144 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
145 xs.publish(new Func1<Observable<Integer>, Observable<Integer>>() {
146
147 @Override
148 public Observable<Integer> call(Observable<Integer> xs) {
149 return xs.takeUntil(xs.skipWhile(new Func1<Integer, Boolean>() {
150
151 @Override
152 public Boolean call(Integer i) {
153 return i <= 3;
154 }
155
156 }));
157 }
158
159 }).subscribe(ts);
160 ts.awaitTerminalEvent();
161 ts.assertNoErrors();
162 ts.assertReceivedOnNext(Arrays.asList(0, 1, 2, 3));
163 assertEquals(5, emitted.get());
164 System.out.println(ts.getOnNextEvents());
165 }
166
167
168 @Test
169 public void testTakeUntilWithPublishedStream() {
170 Observable<Integer> xs = Observable.range(0, RxRingBuffer.SIZE * 2);
171 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
172 ConnectableObservable<Integer> xsp = xs.publish();
173 xsp.takeUntil(xsp.skipWhile(new Func1<Integer, Boolean>() {
174
175 @Override
176 public Boolean call(Integer i) {
177 return i <= 3;
178 }
179
180 })).subscribe(ts);
181 xsp.connect();
182 System.out.println(ts.getOnNextEvents());
183 }
184
185 @Test(timeout = 10000)
186 public void testBackpressureTwoConsumers() {
187 final AtomicInteger sourceEmission = new AtomicInteger();
188 final AtomicBoolean sourceUnsubscribed = new AtomicBoolean();
189 final Observable<Integer> source = Observable.range(1, 100)
190 .doOnNext(new Action1<Integer>() {
191 @Override
192 public void call(Integer t1) {
193 sourceEmission.incrementAndGet();
194 }
195 })
196 .doOnUnsubscribe(new Action0() {
197 @Override
198 public void call() {
199 sourceUnsubscribed.set(true);
200 }
201 }).share();
202 ;
203
204 final AtomicBoolean child1Unsubscribed = new AtomicBoolean();
205 final AtomicBoolean child2Unsubscribed = new AtomicBoolean();
206
207 final TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>();
208
209 final TestSubscriber<Integer> ts1 = new TestSubscriber<Integer>() {
210 @Override
211 public void onNext(Integer t) {
212 if (getOnNextEvents().size() == 2) {
213 source.doOnUnsubscribe(new Action0() {
214 @Override
215 public void call() {
216 child2Unsubscribed.set(true);
217 }
218 }).take(5).subscribe(ts2);
219 }
220 super.onNext(t);
221 }
222 };
223
224 source.doOnUnsubscribe(new Action0() {
225 @Override
226 public void call() {
227 child1Unsubscribed.set(true);
228 }
229 }).take(5).subscribe(ts1);
230
231 ts1.awaitTerminalEvent();
232 ts2.awaitTerminalEvent();
233
234 ts1.assertNoErrors();
235 ts2.assertNoErrors();
236
237 assertTrue(sourceUnsubscribed.get());
238 assertTrue(child1Unsubscribed.get());
239 assertTrue(child2Unsubscribed.get());
240
241 ts1.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5));
242 ts2.assertReceivedOnNext(Arrays.asList(4, 5, 6, 7, 8));
243
244 assertEquals(8, sourceEmission.get());
245 }
246
247 @Test
248 public void testConnectWithNoSubscriber() {
249 TestScheduler scheduler = new TestScheduler();
250 ConnectableObservable<Long> co = Observable.timer(10, 10, TimeUnit.MILLISECONDS, scheduler).take(3).publish();
251 co.connect();
252
253 scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS);
254 TestSubscriber<Long> subscriber = new TestSubscriber<Long>();
255 co.subscribe(subscriber);
256
257 scheduler.advanceTimeBy(50, TimeUnit.MILLISECONDS);
258 subscriber.assertReceivedOnNext(Arrays.asList(1L, 2L));
259 subscriber.assertNoErrors();
260 subscriber.assertTerminalEvent();
261 }
262 }